Aller au contenu principal

Prefect: Deploy Flows on Work Pool

· 3 minutes de lecture

Prefect

Prefect is a workflow management system that allows you to define, schedule, and monitor workflows.

Go to Prefect

Introduction

In this post, we will learn how to deploy Prefect flows on a work pool, for triggering the flow via API.

Prerequisites

  • Python installed
  • Prefect installed
  • Prefect server running (cmd: prefect server start)

Steps

  1. Create a work pool
  2. Transform function to flow
  3. Deploy flow on work pool
  4. Trigger flow via API

Create a work pool

More about work pools

Create a Process work pool

prefect work-pool create --type process my-work-pool

Check the work pool

prefect work-pool ls

Start the work pool

prefect worker start --pool my-work-pool

Transform function to flow

Create a folder code

In this folder, create 2 files with one function each, who will be transformed to flow.

file 1: code/flow1.pyfile 2: code/flow2.py

from prefect import flow

@flow def my_flow_1():     print("This is flow 1")

my_flow_1.run()

from prefect import flow

@flow def my_flow_2():     print("This is flow 2")

my_flow_2.run()


With there files, if u want, you can run the flows with the command python code/flow1.py and python code/flow2.py.

Deploy flow on work pool

More about deploying flows

More about YAML configuration

Now we will deploy the flows on the work pool.

For easy deployment, we will create a YAML file with the flows configurations.

Create a file prefect.yaml with the following content:

# Generic metadata about this project
name: test_flows_deployment
prefect-version: 3.1.0

# build section allows you to manage and build docker images
build: null

# push section allows you to manage if and how this project is uploaded to remote locations
push: null

# pull section allows you to provide instructions for cloning this project in remote locations
pull:
- prefect.deployments.steps.set_working_directory:
directory: /path/to/your/project

# the deployments section allows you to provide configuration for deploying flows
deployments:
- name: flow1
version: null
tags: []
concurrency_limit: null
description: null
entrypoint: code/flow1.py
parameters: {}
work_pool:
name: my-work-pool
work_queue_name: null
job_variables: {}
enforce_parameter_schema: true
schedules: []

- name: flow2
version: null
tags: []
concurrency_limit: null
description: null
entrypoint: code/flow2.py
parameters: {}
work_pool:
name: my-work-pool
work_queue_name: null
job_variables: {}
enforce_parameter_schema: true
schedules: []

For deploying the flows based on the YAML file, run the following command:

prefect deploy --all

You can deploy a specific flow with the command:

prefect deploy --name flow1 --name flow2

Trigger flow via API

More about API

More about API Orchestration

Now we can trigger the flows via API.

import asyncio
from prefect.client.orchestration import get_client
from prefect.deployments import run_deployment

# with the client
async def call():
client = get_client()
await client.create_flow_from_name("flow1")

# OR with the deployment
async def call():
await run_deployment("flow1")

if __name__ == "__main__":
asyncio.run(call())